/*
 * Copyright (C) 2016 Square, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package okio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public final class PipeTest {
  final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

  @After public void tearDown() throws Exception {
    executorService.shutdown();
  }

  @Test public void test() throws Exception {
    Pipe pipe = new Pipe(6);
    pipe.sink().write(new Buffer().writeUtf8("abc"), 3L);

    Source source = pipe.source();
    Buffer readBuffer = new Buffer();
    assertEquals(3L, source.read(readBuffer, 6L));
    assertEquals("abc", readBuffer.readUtf8());

    pipe.sink().close();
    assertEquals(-1L, source.read(readBuffer, 6L));

    source.close();
  }

  /**
   * A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a
   * consumer consumes them. Both compute hashes of their data to confirm that they're as expected.
   */
  @Test public void largeDataset() throws Exception {
    final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange.
    final long totalBytes = 16L * 1024L * 1024L;
    ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d");

    // Write data to the sink.
    Future<ByteString> sinkHash = executorService.submit(new Callable<ByteString>() {
      @Override public ByteString call() throws Exception {
        HashingSink hashingSink = HashingSink.sha1(pipe.sink());
        Random random = new Random(0);
        byte[] data = new byte[8192];

        Buffer buffer = new Buffer();
        for (long i = 0L; i < totalBytes; i += data.length) {
          random.nextBytes(data);
          buffer.write(data);
          hashingSink.write(buffer, buffer.size());
        }

        hashingSink.close();
        return hashingSink.hash();
      }
    });

    // Read data from the source.
    Future<ByteString> sourceHash = executorService.submit(new Callable<ByteString>() {
      @Override public ByteString call() throws Exception {
        Buffer blackhole = new Buffer();
        HashingSink hashingSink = HashingSink.sha1(blackhole);

        Buffer buffer = new Buffer();
        while (pipe.source().read(buffer, Long.MAX_VALUE) != -1) {
          hashingSink.write(buffer, buffer.size());
          blackhole.clear();
        }

        pipe.source().close();
        return hashingSink.hash();
      }
    });

    assertEquals(expectedHash, sinkHash.get());
    assertEquals(expectedHash, sourceHash.get());
  }

  @Test public void sinkTimeout() throws Exception {
    TestUtil.INSTANCE.assumeNotWindows();

    Pipe pipe = new Pipe(3);
    pipe.sink().timeout().timeout(1000, TimeUnit.MILLISECONDS);
    pipe.sink().write(new Buffer().writeUtf8("abc"), 3L);
    double start = now();
    try {
      pipe.sink().write(new Buffer().writeUtf8("def"), 3L);
      fail();
    } catch (InterruptedIOException expected) {
      assertEquals("timeout", expected.getMessage());
    }
    assertElapsed(1000.0, start);

    Buffer readBuffer = new Buffer();
    assertEquals(3L, pipe.source().read(readBuffer, 6L));
    assertEquals("abc", readBuffer.readUtf8());
  }

  @Test public void sourceTimeout() throws Exception {
    TestUtil.INSTANCE.assumeNotWindows();

    Pipe pipe = new Pipe(3L);
    pipe.source().timeout().timeout(1000, TimeUnit.MILLISECONDS);
    double start = now();
    Buffer readBuffer = new Buffer();
    try {
      pipe.source().read(readBuffer, 6L);
      fail();
    } catch (InterruptedIOException expected) {
      assertEquals("timeout", expected.getMessage());
    }
    assertElapsed(1000.0, start);
    assertEquals(0, readBuffer.size());
  }

  /**
   * The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates
   * sleeping 1000 ms, then reading 3 bytes. That should make for an approximate timeline like
   * this:
   *
   *    0: writer writes 'abc', blocks 0: reader sleeps until 1000
   * 1000: reader reads 'abc', sleeps until 2000
   * 1000: writer writes 'def', blocks
   * 2000: reader reads 'def', sleeps until 3000
   * 2000: writer writes 'ghi', blocks
   * 3000: reader reads 'ghi', sleeps until 4000
   * 3000: writer writes 'jkl', returns
   * 4000: reader reads 'jkl', returns
   *
   * Because the writer is writing to a buffer, it finishes before the reader does.
   */
  @Test public void sinkBlocksOnSlowReader() throws Exception {
    final Pipe pipe = new Pipe(3L);
    executorService.execute(new Runnable() {
      @Override public void run() {
        try {
          Buffer buffer = new Buffer();
          Thread.sleep(1000L);
          assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE));
          assertEquals("abc", buffer.readUtf8());
          Thread.sleep(1000L);
          assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE));
          assertEquals("def", buffer.readUtf8());
          Thread.sleep(1000L);
          assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE));
          assertEquals("ghi", buffer.readUtf8());
          Thread.sleep(1000L);
          assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE));
          assertEquals("jkl", buffer.readUtf8());
        } catch (IOException | InterruptedException e) {
          throw new AssertionError();
        }
      }
    });

    double start = now();
    pipe.sink().write(new Buffer().writeUtf8("abcdefghijkl"), 12);
    assertElapsed(3000.0, start);
  }

  @Test public void sinkWriteFailsByClosedReader() throws Exception {
    final Pipe pipe = new Pipe(3L);
    executorService.schedule(new Runnable() {
      @Override public void run() {
        try {
          pipe.source().close();
        } catch (IOException e) {
          throw new AssertionError();
        }
      }
    }, 1000, TimeUnit.MILLISECONDS);

    double start = now();
    try {
      pipe.sink().write(new Buffer().writeUtf8("abcdef"), 6);
      fail();
    } catch (IOException expected) {
      assertEquals("source is closed", expected.getMessage());
      assertElapsed(1000.0, start);
    }
  }

  @Test public void sinkFlushDoesntWaitForReader() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.sink().write(new Buffer().writeUtf8("abc"), 3);
    pipe.sink().flush();

    BufferedSource bufferedSource = Okio.buffer(pipe.source());
    assertEquals("abc", bufferedSource.readUtf8(3));
  }

  @Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.sink().write(new Buffer().writeUtf8("abc"), 3);
    pipe.source().close();
    try {
      pipe.sink().flush();
      fail();
    } catch (IOException expected) {
      assertEquals("source is closed", expected.getMessage());
    }
  }

  @Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.sink().write(new Buffer().writeUtf8("abc"), 3);
    pipe.source().close();
    try {
      pipe.sink().close();
      fail();
    } catch (IOException expected) {
      assertEquals("source is closed", expected.getMessage());
    }
  }

  @Test public void sinkClose() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.sink().close();
    try {
      pipe.sink().write(new Buffer().writeUtf8("abc"), 3);
      fail();
    } catch (IllegalStateException expected) {
      assertEquals("closed", expected.getMessage());
    }
    try {
      pipe.sink().flush();
      fail();
    } catch (IllegalStateException expected) {
      assertEquals("closed", expected.getMessage());
    }
  }

  @Test public void sinkMultipleClose() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.sink().close();
    pipe.sink().close();
  }

  @Test public void sinkCloseDoesntWaitForSourceRead() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.sink().write(new Buffer().writeUtf8("abc"), 3);
    pipe.sink().close();

    BufferedSource bufferedSource = Okio.buffer(pipe.source());
    assertEquals("abc", bufferedSource.readUtf8());
    assertTrue(bufferedSource.exhausted());
  }

  @Test public void sourceClose() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.source().close();
    try {
      pipe.source().read(new Buffer(), 3);
      fail();
    } catch (IllegalStateException expected) {
      assertEquals("closed", expected.getMessage());
    }
  }

  @Test public void sourceMultipleClose() throws Exception {
    Pipe pipe = new Pipe(100L);
    pipe.source().close();
    pipe.source().close();
  }

  @Test public void sourceReadUnblockedByClosedSink() throws Exception {
    final Pipe pipe = new Pipe(3L);
    executorService.schedule(new Runnable() {
      @Override public void run() {
        try {
          pipe.sink().close();
        } catch (IOException e) {
          throw new AssertionError();
        }
      }
    }, 1000, TimeUnit.MILLISECONDS);

    double start = now();
    Buffer readBuffer = new Buffer();
    assertEquals(-1, pipe.source().read(readBuffer, Long.MAX_VALUE));
    assertEquals(0, readBuffer.size());
    assertElapsed(1000.0, start);
  }

  /**
   * The writer has 12 bytes to write. It alternates sleeping 1000 ms, then writing 3 bytes. The
   * reader is reading as fast as it can. That should make for an approximate timeline like this:
   *
   *    0: writer sleeps until 1000
   *    0: reader blocks
   * 1000: writer writes 'abc', sleeps until 2000
   * 1000: reader reads 'abc'
   * 2000: writer writes 'def', sleeps until 3000
   * 2000: reader reads 'def'
   * 3000: writer writes 'ghi', sleeps until 4000
   * 3000: reader reads 'ghi'
   * 4000: writer writes 'jkl', returns
   * 4000: reader reads 'jkl', returns
   */
  @Test public void sourceBlocksOnSlowWriter() throws Exception {
    final Pipe pipe = new Pipe(100L);
    executorService.execute(new Runnable() {
      @Override public void run() {
        try {
          Thread.sleep(1000L);
          pipe.sink().write(new Buffer().writeUtf8("abc"), 3);
          Thread.sleep(1000L);
          pipe.sink().write(new Buffer().writeUtf8("def"), 3);
          Thread.sleep(1000L);
          pipe.sink().write(new Buffer().writeUtf8("ghi"), 3);
          Thread.sleep(1000L);
          pipe.sink().write(new Buffer().writeUtf8("jkl"), 3);
        } catch (IOException | InterruptedException e) {
          throw new AssertionError();
        }
      }
    });

    double start = now();
    Buffer readBuffer = new Buffer();

    assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE));
    assertEquals("abc", readBuffer.readUtf8());
    assertElapsed(1000.0, start);

    assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE));
    assertEquals("def", readBuffer.readUtf8());
    assertElapsed(2000.0, start);

    assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE));
    assertEquals("ghi", readBuffer.readUtf8());
    assertElapsed(3000.0, start);

    assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE));
    assertEquals("jkl", readBuffer.readUtf8());
    assertElapsed(4000.0, start);
  }

  /** Returns the nanotime in milliseconds as a double for measuring timeouts. */
  private double now() {
    return System.nanoTime() / 1000000.0d;
  }

  /**
   * Fails the test unless the time from start until now is duration, accepting differences in
   * -50..+450 milliseconds.
   */
  private void assertElapsed(double duration, double start) {
    assertEquals(duration, now() - start - 200d, 250.0);
  }
}
